package x.ovo.wechat.bot.impl.core;

import lombok.extern.slf4j.Slf4j;
import org.dromara.hutool.core.text.StrUtil;
import org.dromara.hutool.core.thread.BlockPolicy;
import org.dromara.hutool.core.thread.ExecutorBuilder;
import org.dromara.hutool.core.thread.ThreadUtil;
import x.ovo.wechat.bot.core.Context;
import x.ovo.wechat.bot.core.contact.Contactable;
import x.ovo.wechat.bot.core.entity.Member;
import x.ovo.wechat.bot.core.event.ExceptionEvent;
import x.ovo.wechat.bot.core.event.MessageEvent;
import x.ovo.wechat.bot.core.message.Message;
import x.ovo.wechat.bot.core.message.RawMessage;
import x.ovo.wechat.bot.core.message.TextMessage;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 消息消费线程
 *
 * @author ovo on 2024/07/18.
 */
@Slf4j(topic = "MessageConsumer")
public class MessageConsumer implements Runnable {

    public static final ReentrantLock lock = new ReentrantLock();
    private static final Condition single = lock.newCondition();
    private final Context ctx = Context.INSTANCE;
    private ExecutorService executor;
    /** 消息队列 */
    private static final ConcurrentLinkedQueue<RawMessage> QUEUE = new ConcurrentLinkedQueue<>();

    public static void addAll(Collection<RawMessage> messages) {
        lock.lock();
        try {
            if (QUEUE.isEmpty()) single.signal();
            QUEUE.addAll(messages);
        } finally {
            lock.unlock();
        }
    }

    @Override
    public void run() {
        this.executor = ExecutorBuilder.of()
                .setCorePoolSize(5)
                .setMaxPoolSize(20)
                .setThreadFactory(ThreadUtil.newNamedThreadFactory("executor-", true))
                .setWorkQueue(new LinkedBlockingQueue<>(50))
                .setHandler(new BlockPolicy())
                .build();
        log.info("消息转换及消息消费线程已启动");
        while (this.ctx.isRunning()) {
            try {
                this.loop();
            } catch (Throwable e) {
                log.error("消息处理时出现异常: {}", e.getMessage());
                log.debug("消息处理出现异常", e);
                new ExceptionEvent(e).fire();
            }
        }
        this.executor.shutdown();
    }

    private void loop() {
        // 队列为空则等待
        lock.lock();
        try {
            while (QUEUE.isEmpty()) {
                single.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.debug("MessageConsumer loop interrupted.");
        } finally {
            lock.unlock();
        }
        this.handle();
    }

    private void handle() {
        // 消息队列不为空，取出消息
        RawMessage rawMessage = QUEUE.poll();
        if (Objects.isNull(rawMessage)) return;

        // 转换原始消息为消息类
        Message message = MessageFactory.create(rawMessage);
        // 状态通知，每隔5分钟同步已读消息
        if (rawMessage.getCreateTime() / 60 % 5 == 0) {
            this.ctx.getApi().statusNotify(message.getFrom().getUserName());
        }
        log.info("{} -> {}: [{}] {}", this.formatName(message.isGroup() ? message.getMember() : message.getFrom()), this.formatName(message.isGroup() ? message.getFrom() : message.getTo()), message.getType().getName(), message.getContent());

        this.executor.execute(() -> {
            // 如果是指令，则执行指令
            if (message instanceof TextMessage && message.getContent().startsWith("/")) {
                try {
                    this.ctx.getCommandManager().execute((TextMessage) message);
                } catch (Exception e) {
                    log.error("指令执行 [{}] 时出现异常: {}", message.getContent(), e.getMessage());
                    log.debug("指令执行出现异常", e);
                    new ExceptionEvent(e).fire();
                    message.getFrom().sendMessage("指令执行时出现异常: " + e.getMessage());
                }
            }
            // 发送事件
            try {
                this.ctx.getEventManager().fireEvent(new MessageEvent<>(message));
            } catch (Exception e) {
                log.error("事件监听器出现异常: {}", e.getMessage());
                log.debug("消息事件出现异常", e);
                new ExceptionEvent(e).fire();
            }
        });
    }

    /**
     * 格式化联系人名称
     *
     * @return {@link String}
     */
    public String formatName(Contactable contact) {
        if (Objects.isNull(contact)) return null;
        // 获取微信昵称
        String nickName = contact.getNickName();
        // 获取备注
        String remarkName = contact.getRemarkName();
        if (contact instanceof Member) {
            remarkName = ((Member) contact).getDisplayName();
        }

        return StrUtil.format("{}{}", nickName, StrUtil.isNotBlank(remarkName) ? StrUtil.format("<{}>", remarkName) : "");
    }
}
